Cloud Composer でDAGの実行状況をSlackに通知したい
こんにちは!エノカワです。
Cloud Composer は、Apache Airflow で構築されたフルマネージドのワークフローオーケストレーションサービスです。
Apache Airflow のオープンソース プロジェクトを基に構築されており、Python プログラミング言語を使用して動作します。
やりたいこと
- Cloud Composer でDAGの実行状況をSlackに通知したい
Airflowでは、DAGやタスクの成功、失敗、リトライなどのイベントに応じてコールバック関数を設定できます。
今回は、コールバック関数を使用してDAGの実行状況をSlackに通知することを試してみたのでご紹介します。
コールバック関数
コールバック関数は、DAGの特定のイベント(成功、失敗、リトライなど)が発生したときに実行される関数です。
これにより、イベントが発生した際のアクションを自動で実行することができます。
例えば、タスクが失敗したときにアラートを発したり、DAG内の最後のタスクが成功したときの事後処理を行うことができます。
コールバック関数をトリガーできるタスクイベントには複数種類ありますが、今回は下記3つを使用しました。
名前 | 説明 |
---|---|
on_success_callback | タスクが成功したときに呼び出されます |
on_failure_callback | タスクが失敗したときに呼び出されます |
on_retry_callback | タスクが再試行されるときに呼び出されます |
なお、コールバック関数は、ワーカーによる実行によりタスクの状態が変化した場合にのみ呼び出されます。
そのため、コマンドラインインターフェイス(CLI)またはユーザーインターフェイス(UI)によって設定されたタスクの変更では、コールバック関数は実行されません。
コールバック関数の詳細については、下記ドキュメントを参照ください。
Cloud Composer 環境を作成
DAGを動かす Cloud Composer 環境を作成します。
Google Cloud コンソールで Cloud Composer の[環境の作成]ページに移動し、Cloud Composer 環境を作成します。
test-composer
という名前で、東京リージョン、最新のイメージバージョンを選択し、サービスアカウントなど他はでデフォルトのままで作成しました。
Slack パッケージをインストール
今回は、SlackでWebhook URLを発行して通知を行うため、SlackWebhookOperator
を利用します。
apache-airflow-providers-slack
パッケージをインストールする必要があります。
インストール手順
作成したCloud Composer環境の設定ページに移動します。
[PyPI パッケージ]セクションのパッケージ名にapache-airflow-providers-slack
を入力します。
[保存]ボタンをクリックするとパッケージのインストールが始まります。
インストールが完了しました。
Airflow接続を追加する
Slackに通知を送るための情報をAirflow接続を追加します。
これにより、DAGのコード内で直接認証情報を記述する必要がなくなります。
接続の追加手順
Cloud Composer環境のAirflow UIにアクセスします。
[Admin] - [Connections]をクリックします。
Airflow接続の管理画面が表示されますので、[+]ボタンをクリックします。
Airflow接続の編集画面が表示されますので、以下の値を入力します。
- Connection Id
slack_webhook_conn
- Connection Type
HTTP
- Host
https://hooks.slack.com/services/
- Password
認証トークン(https://hooks.slack.com/services/のあとに続く文字列)
[save]ボタンをクリックして、Airflow接続を保存します。
Airflow接続にslack_webhook_conn
が追加されました。
DAG を作成
DAGを作成し、コールバック関数を組み込みます。
以下は、今回試したDAGのコードです。
DAG成功時、失敗時、リトライ時にSlack通知を送るコールバック関数を定義しています。
from datetime import datetime, timedelta from airflow import DAG from airflow.operators.dummy_operator import DummyOperator from airflow.operators.python_operator import PythonOperator from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator # DAG成功時のコールバック関数 def dag_success_alert(context): message = f""" *DAG '{context.get('dag').dag_id}'* executed successfully! :white_check_mark: Run ID: {context['run_id']} """ slack_notification(context, message) print(message) # Task失敗時のコールバック関数 def task_failure_alert(context): message = f""" *Task '{context.get('task_instance').task_id}'* in DAG '{context.get('dag').dag_id}' failed! :x: Run ID: {context['run_id']} """ slack_notification(context, message) print(message) # Taskリトライ時のコールバック関数 def task_retry_alert(context): message = f""" *Task '{context.get('task_instance').task_id}'* in DAG '{context.get('dag').dag_id}' is being retried! :arrows_counterclockwise: Run ID: {context['run_id']} """ slack_notification(context, message) print(message) # Slack通知を送信する関数 def slack_notification(context, message): slack_notification = SlackWebhookOperator( task_id='slack_notification', slack_webhook_conn_id='slack_webhook_conn', message=message, username='Airflow', ) return slack_notification.execute(context=context) def retry_function(**context): # タスクが1回目の実行で失敗するような条件を設定 if context.get('ti').try_number == 1: raise Exception("Simulating task failure") else: print("Task succeeded in retry") default_args = { 'start_date': datetime(2022, 1, 1), 'retries': 0, 'retry_delay': timedelta(minutes=1), 'on_success_callback': None, 'on_failure_callback': task_failure_alert, 'on_retry_callback': task_retry_alert, } with DAG( dag_id='sample_dag_with_callbacks', default_args=default_args, schedule_interval=None, catchup=False, ) as dag: start_task = DummyOperator( task_id='start_task' ) retry_task = PythonOperator( task_id='retry_task', python_callable=retry_function, provide_context=True, ) end_task = DummyOperator( task_id='end_task', on_success_callback=[dag_success_alert] ) start_task >> retry_task >> end_task
コードの説明
- dag_success_alert
DAG成功時にSlack通知を送るコールバック関数
DAG内の最後のタスクend_task
のon_success_callback
に登録 - task_failure_alert
Task失敗時にSlack通知を送るコールバック関数
DAGのon_failure_callback
に登録 - task_retry_alert
Taskリトライ時にSlack通知を送るコールバック関数
DAGのon_retry_callback
に登録 - slack_notification
実際にSlackに通知を送る関数
SlackWebhookOperator
を使用 - retry_function 1回目の実行で失敗し、リトライ時に成功する関数
リトライ設定について
retry_task
タスクは1回目の実行で失敗するため、task_failure_alert関数が実行されることを想定しています。
失敗時にretry_task
タスクがリトライしてしまわないように、DAGのdefault_args
でリトライ回数なし('retries': 0
)を設定しています。
default_args = { 'start_date': datetime(2022, 1, 1), 'retries': 0, 'retry_delay': timedelta(minutes=1), 'on_success_callback': None, 'on_failure_callback': task_failure_alert, 'on_retry_callback': task_retry_alert, }
リトライ通知および成功通知のテストは、リトライ回数ありに設定を変更した後に行います。
DAGを実行
DAGを実行してみましょう。
手動でDAGをトリガーします。
[DAGs]ページのsample_dag_with_callbacks
をクリックしてDAGの詳細画面に移動し、画面右上の [再生マーク]ボタンをクリックします。
DAG が起動し、想定通りretry_task
タスクで失敗しました。
on_failure_callback
に設定しているtask_failure_alert関数がトリガーされ、タスクの失敗がSlackに通知されました!
次に、タスクがリトライした後に成功する状況を発生させてみます。
retry_task
タスクがリトライされるように、リトライ設定(retries
)の値を変更します。
DAGのコードを下記のように変更してデプロイします。
default_args = { 'start_date': datetime(2022, 1, 1), # 'retries': 0, 'retries': 1, 'retry_delay': timedelta(minutes=1), 'on_success_callback': None, 'on_failure_callback': task_failure_alert, 'on_retry_callback': task_retry_alert, }
手動でDAGをトリガーします。
DAG が起動し、retry_task
タスクがリトライされました。
このタイミングでon_retry_callback
に設定しているtask_retry_alert関数がトリガーされます。
リトライ後のタスクは成功しました。
on_success_callback
に設定しているdag_success_alert関数がトリガーされます。
Slackを確認すると、タスクのリトライの後に成功の通知がされていました!
まとめ
以上、Cloud ComposerでDAGの実行状況をSlackに通知する方法をご紹介しました。
コールバック関数を利用することで、DAGやタスクのステータスに応じた通知を自動化することができました。
これにより、タスクの失敗やリトライ、成功といった重要なイベントをリアルタイムで把握できるため、運用の効率化と迅速な対応が可能になります。
使用できるコールバックの種類は、タスクの成功、失敗、リトライ以外にもあるようなので、他も試してみたいと思います。
今回の検証が誰かのお役に立てれば幸いです!
参考
- Managing Connections — Airflow Documentation
- Airflow 接続を管理する | Cloud Composer | Google Cloud
- airflow.providers.slack.operators.slack_webhook — apache-airflow-providers-slack Documentation
- apache-airflow-providers-slack — apache-airflow-providers-slack Documentation
- Callbacks — Airflow Documentation
- Cloud Composer の Python 依存関係をインストールする | Google Cloud
- Cloud Composer の概要 | Google Cloud
- Cloud Composer 環境を作成する | Google Cloud
- 【テックコラム】Airflowでslack通知を簡単に実装する方法 | DataCurrent